 1.Kafka高级特性解析之延时队列
   
   两个follower副本都已经拉取到了leader副本的最新位置，此时又向leader副本发送拉取请求，而leader副本并没
有新的消息写入，那么此时leader副本该如何处理呢？可以直接返回空的拉取结果给follower副本，不过在leader副本
一直没有新消息写入的情况下，follower副本会一直发送拉取请求，并且总收到空的拉取结果，消耗资源。
   Kafka在处理拉取请求时，会先读取⼀次⽇志⽂件，如果收集不到足够多（fetchMinBytes，由参数fetch.min.bytes
配置，默认值为1）的消息，那么就会创建一个延时拉取操作（DelayedFetch）以等待拉取到足够数量的消息。当延
时拉取操作执行时，会再读取一次日志文件，然后将拉取结果返回给follower副本。
   延迟操作不只是拉取消息时的特有操作，在Kafka中有多种延时操作，⽐如延时数据删除、延时生产等。
   对于延时生产（消息）而言，如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要
等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果，或者捕获超时异常。
   假设某个分区有3个副本：leader、follower1和follower2，它们都在分区的ISR集合中。不考虑ISR变动的情况，
Kafka在收到客户端的生产请求后，将消息3和消息4写⼊leader副本的本地日志文件。
   由于客户端设置了acks为-1,那么需要等到follower1和follower2两个副本都收到消息3和消息4后才能告知客户
端正确地接收了所发送的消息。如果在一定的时间内，follower1副本或follower2副本没能够完全拉取到消息3和消息
4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数request.timeout.ms配置，默认值为30000，即30s。
   那么这里等待消息3和消息4写⼊follower1副本和follower2副本，并返回相应的响应结果给客户端的动作是由谁
来执行的呢？在将消息写入leader副本的本地日志文件之后，Kafka会创建一个延时的生产操作(DelayedProduce)，
用来处理消息正常写入所有副本或超时的情况，以返回相应的响应结果给客户端。
   延时操作需要延时返回响应的结果，首先它必须有一个超时时间(delayMs)，如果在这个超时时间内没有完成既
定的任务，那么就需要强制完成以返回响应结果给客户端。其次，延时操作不同于定时操作，定时操作是指在特定
时间之后执行的操作，而延时操作可以在所设定的超时时间之前完成，所以延时操作能够支持外部事件的触发。
   就延时生产操作而言，它的外部事件是所要写⼊消息的某个分区的HW（高水位）发生增长。也就是说，随着
follower副本不断地与leader副本进行消息同步，进⽽促使HW进一步增长，HW每增长一次都会检测是否能够完成此
次延时生产操作，如果可以就执行以此返回响应结果给客户端；如果在超时时间内始终无法完成，则强制执行。
   延时拉取操作，是由超时触发或外部事件触发而被执行的。超时触发很好理解，就是等到超时时间之后触发第二
次读取日志文件的操作。外部事件触发就稍复杂了一些，因为拉取请求不单单由follower副本发起，也可以由消费者
客户端发起，两种情况所对应的外部事件也是不同的。如果是follower副本的延时拉取，它的外部事件就是消息追加
到了leader副本的本地日志文件中；如果是消费者客户端的延时拉取，它的外部事件可以简单地理解为HW的增长。
   时间轮实现延时队列。
   TimeWheel。size，每个单元格的时间，每个单元格都代表一个时间，size*每个单元格的时间就是一个周期。
 
 2.重试队列
   
   kafka没有重试机制不支持消息重试，也没有死信队列，因此使用kafka做消息队列时，需要自己实现消息重试的
功能。
   实现
   创建新的kafka主题作为重试队列：
   (1). 创建一个topic作为重试topic，用于接收等待重试的消息。
   (2). 普通topic消费者设置待重试消息的下一个重试topic。
   (3). 从重试topic获取待重试消息储存到redis的zset中，并以下一次消费时间排序
   (4). 定时任务从redis获取到达消费事件的消息，并把消息发送到对应的topic
   (5). 同一个消息重试次数过多则不再重试
   代码实现
   1).新建springboot项目
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lagou.kafka.demo</groupId>
    <artifactId>demo-retryqueue</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo-retryqueue</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-json -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-json</artifactId>
            <version>5.7.9</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.9</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

   2).添加application.properties
# bootstrap.servers
spring.kafka.bootstrap-servers=linux121:9092
# key序列化器
spring.kafka.producer.key-serializer=
org.apache.kafka.common.serialization.StringSerializer
# value序列化器
spring.kafka.producer.value-serializer=
org.apache.kafka.common.serialization.StringSerializer

# 消费组id：group.id
spring.kafka.consumer.group-id=retryGroup
# key反序列化器
spring.kafka.consumer.key-deserializer=
org.apache.kafka.common.serialization.StringDeserializer
# value反序列化器
spring.kafka.consumer.value-deserializer=
org.apache.kafka.common.serialization.StringDeserializer
# redis数据库编号
spring.redis.database=0
# redis主机地址
spring.redis.host=linux121
# redis端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数（使用负值表示没有限制）
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间（使用负值表示没有限制）
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间（毫秒）
spring.redis.timeout=1000

# Kafka主题名称
spring.kafka.topics.test=tp_demo_retry_01
# 重试队列
spring.kafka.topics.retry=tp_demo_retry_02

   3).RetryqueueApplication.java
package com.lagou.kafka.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RetryqueueApplication {
    public static void main(String[] args) {

        SpringApplication.run(RetryqueueApplication.class, args);
    }
}

   4).AppConfig.java
package com.lagou.kafka.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

@Configuration
public class AppConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接工厂
        template.setConnectionFactory(factory);

        return template;
    }
}

   5).RetryController.java
package com.lagou.kafka.demo.controller;

import com.lagou.kafka.demo.service.KafkaService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class RetryController {
    @Autowired
    private KafkaService kafkaService;
    @Value("${spring.kafka.topics.test}")
    private String topic;

    @RequestMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) throws
            ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(
                topic,
                message
        );
        String result = kafkaService.sendMessage(record);
        return result;
    }
}
   6).KafkaService.java
package com.lagou.kafka.demo.service;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.ExecutionException;

@Service
public class KafkaService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(ProducerRecord<String, String> record) throws
            ExecutionException, InterruptedException {
        SendResult<String, String> result = this.kafkaTemplate.send(record).get();
        RecordMetadata metadata = result.getRecordMetadata();
        String returnResult = metadata.topic() + "\t" + metadata.partition() +
                "\t" + metadata.offset();
        System.out.println("发送消息成功：" + returnResult);
        return returnResult;
    }
}
   7).ConsumerListener.java
package com.lagou.kafka.demo.listener;
//import com.lagou.kafka.demo.service.KafkaRetryService;

import com.lagou.kafka.demo.service.RetryService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerListener {
    private static final Logger log =
            LoggerFactory.getLogger(ConsumerListener.class);

    @Autowired
    private RetryService kafkaRetryService;
    private static int index = 0;

    @KafkaListener(topics = "${spring.kafka.topics.test}", groupId =
            "${spring.kafka.consumer.group-id}")
    public void consume(ConsumerRecord<String, String> record) {
        try {
            // 业务处理
            log.info("消费的消息：" + record);
            index++;
            if (index % 2 == 0) {
                throw new Exception("该重发了");
            }
        } catch (Exception e) {
            log.error(e.getMessage());
            // 消息重试
            kafkaRetryService.consumerLater(record);
        }
    }
}
   8).RetryService.java
package com.lagou.kafka.demo.service;

import cn.hutool.json.JSONUtil;
//import com.alibaba.fastjson.JSON;
import com.lagou.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;

@Service
public class RetryService {
    private static final Logger log =
            LoggerFactory.getLogger(RetryService.class);
    /**
     * 消息消费失败后下一次消费的延迟时间(秒)
     * 第一次重试延迟10秒;第 二次延迟30秒,第三次延迟1分钟...
     */
    private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1 * 60, 2 * 60,
            5 * 60, 10 * 60, 30 * 60, 1 * 60 * 60, 2 * 60 * 60};
    /**
     * 重试topic
     */
    @Value("${spring.kafka.topics.retry}")
    private String retryTopic;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void consumerLater(ConsumerRecord<String, String> record) {
        // 获取消息的已重试次数
        int retryTimes = getRetryTimes(record);
        Date nextConsumerTime = getNextConsumerTime(retryTimes);
        // 如果达到重试次数，则不再重试
        if (nextConsumerTime == null) {
            return;
        }
        // 组织消息
        RetryRecord retryRecord = new RetryRecord();
        retryRecord.setNextTime(nextConsumerTime.getTime());
        retryRecord.setTopic(record.topic());
        retryRecord.setRetryTimes(retryTimes);
        retryRecord.setKey(record.key());
        retryRecord.setValue(record.value());
        // 转换为字符串

        String value = JSONUtil.toJsonStr(retryRecord);
        // 发送到重试队列
        kafkaTemplate.send(retryTopic, null, value);
    }

    /**
     * 获取消息的已重试次数
     */
    private int getRetryTimes(ConsumerRecord record) {
        int retryTimes = -1;
        for (Header header : record.headers()) {
            if (RetryRecord.KEY_RETRY_TIMES.equals(header.key())) {
                ByteBuffer buffer = ByteBuffer.wrap(header.value());
                retryTimes = buffer.getInt();
            }
        }
        retryTimes++;
        return retryTimes;
    }

    /**
     * 获取待重试消息的下一次消费时间
     */
    private Date getNextConsumerTime(int retryTimes) {
        // 重试次数超过上限,不再重试
        if (RETRY_INTERVAL_SECONDS.length < retryTimes) {
            return null;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
        return calendar.getTime();
    }
}

   9).RetryListener.java
package com.lagou.kafka.demo.listener;

import cn.hutool.json.JSONUtil;
//import com.alibaba.fastjson.JSON;
import com.lagou.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Set;
import java.util.UUID;

@Component
@EnableScheduling
public class RetryListener {
    private Logger log = LoggerFactory.getLogger(RetryListener.class);
    private static final String RETRY_KEY_ZSET = "_retry_key";
    private static final String RETRY_VALUE_MAP = "_retry_value";
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Value("${spring.kafka.topics.test}")
    private String bizTopic;

    @KafkaListener(topics = "${spring.kafka.topics.retry}")
// public void consume(List<ConsumerRecord<String, String>> list) {
// for(ConsumerRecord<String, String> record : list){
    public void consume(ConsumerRecord<String, String> record) {
        System.out.println("需要重试的消息：" + record);

        RetryRecord retryRecord = JSONUtil.toBean(record.value(), RetryRecord.class);
/**
 * 防⽌待重试消息太多撑爆redis,可以将待重试消息按下一次重试时间分开存储放到不
 同介质
 * 例如下一次重试时间在半小时以后的消息储存到mysql,并定时从mysql读取即将重试
 的消息储储存到redis
 */
// 通过redis的zset进行时间排序
        String key = UUID.randomUUID().toString();
        redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key,
                record.value());
        redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key,
                retryRecord.getNextTime());
    }
// }

    /**
     * 定时任务从redis读取到达重试时间的消息,发送到对应的topic
     */
//     @Scheduled(cron="2 * * * * *")
    @Scheduled(fixedDelay = 2000)
    public void retryFromRedis() {
        log.warn("retryFromRedis----begin");
        long currentTime = System.currentTimeMillis();
        // 根据时间倒序获取
        Set<ZSetOperations.TypedTuple<Object>> typedTuples =
                redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0,
                        currentTime);
        // 移除取出的消息
        redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0,
                currentTime);
        for (ZSetOperations.TypedTuple<Object> tuple : typedTuples) {
            String key = tuple.getValue().toString();
            String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP,
                    key).toString();
            redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);

            RetryRecord retryRecord = JSONUtil.toBean(value, RetryRecord.class);
            ProducerRecord record = retryRecord.parse();
            ProducerRecord recordReal = new ProducerRecord(
                    bizTopic,
                    record.partition(),
                    record.timestamp(),
                    record.key(),
                    record.value(),
                    record.headers()
            );
            kafkaTemplate.send(recordReal);
        }
        // todo 发生异常将发送失败的消息重新发送到redis
    }
}

   10).RetryRecord.java
package com.lagou.kafka.demo.entity;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class RetryRecord {
    public static final String KEY_RETRY_TIMES = "retryTimes";
    private String key;
    private String value;
    private Integer retryTimes;
    private String topic;
    private Long nextTime;

    public RetryRecord() {
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public Integer getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
        this.retryTimes = retryTimes;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Long getNextTime() {
        return nextTime;
    }

    public void setNextTime(Long nextTime) {
        this.nextTime = nextTime;
    }

    public ProducerRecord parse() {
        Integer partition = null;
        Long timestamp = System.currentTimeMillis();
        List<Header> headers = new ArrayList<Header>();
        ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
        retryTimesBuffer.putInt(retryTimes);
        retryTimesBuffer.flip();
        headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));
        ProducerRecord sendRecord = new ProducerRecord(
                topic, partition, timestamp, key, value, headers);
        return sendRecord;
    }
}

